/*
 *  Copyright (c) 2005-2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 *  WSO2 Inc. licenses this file to you under the Apache License,
 *  Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.wso2.carbon.registry.core.jdbc.dataaccess;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.registry.core.dataaccess.ClusterLock;
import org.wso2.carbon.registry.core.dataaccess.DataAccessManager;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.utils.DBUtils;

import javax.sql.DataSource;
import java.sql.*;

/**
 * A {@link ClusterLock} implementation to avoid database schema creation and data population from
 * multiple nodes for JDBC-base databases.
 */
public class JDBCClusterLock implements ClusterLock {

    private static final Log log = LogFactory.getLog(JDBCClusterLock.class);
    private static final Object lockObject = new Object();
    private static final int lockWaitTime = 5000;
    private static final String defaultClusterLockTableStatement =
            "CREATE TABLE REG_CLUSTER_LOCK (" +
                    "REG_LOCK_NAME VARCHAR (20)," +
                    "REG_LOCK_STATUS VARCHAR (20)," +
                    "PRIMARY KEY (REG_LOCK_NAME))";
    private static final String initRow = "INSERT INTO REG_CLUSTER_LOCK " +
            "(REG_LOCK_NAME, REG_LOCK_STATUS) VALUES ('INITIALIZE', 'UNLOCKED')";
    private static final String LOCK_ROW_CHECK = "SELECT COUNT(REG_LOCK_NAME) FROM " +
            "REG_CLUSTER_LOCK WHERE REG_LOCK_NAME='INITIALIZE'";
    //private static DataSource dataSource;

    private static ThreadLocal<DataSource> dataSource =
            new ThreadLocal<DataSource>() {
                protected DataSource initialValue() {
                    return null;
                }
            };

    private static DataSource getDataSource() {
        return dataSource.get();
    }

    private static void setDataSource(DataSource input) {
        dataSource.set(input);
    }

    public void init(DataAccessManager dataAccessManager,
                                  String clusterLockTableStatement)
            throws RegistryException {
        synchronized (lockObject) {
            String clusterLockTable;
            if (log.isTraceEnabled()) {
                log.trace("Initializing cluster wide database locks");
            }
            if (!(dataAccessManager instanceof JDBCDataAccessManager)) {
                String msg = "Failed to get logs. Invalid data access manager.";
                log.error(msg);
                throw new RegistryException(msg);
            }
            setDataSource(((JDBCDataAccessManager)dataAccessManager).getDataSource());

            if (clusterLockTableStatement != null) {
                clusterLockTable = clusterLockTableStatement;
            } else {
                clusterLockTable = defaultClusterLockTableStatement;
            }

            Connection conn = null;

            try {
                conn = getDataSource().getConnection();

                DatabaseMetaData metaData = conn.getMetaData();
                ResultSet result = metaData.getTables(null, null,
                        DBUtils.getConvertedAutoGeneratedColumnName(metaData
                                .getDatabaseProductName(), "REG_CLUSTER_LOCK"), null);
                boolean lockTableCreated = false;
                try {
                    if (result.next()) {
                        if (log.isTraceEnabled()) {
                            log.trace("Cluster lock table is already created in the Registry " +
                                    "database.");
                        }
                        lockTableCreated = true;
                    }
                } finally {
                    if (result != null) {
                        result.close();
                    }
                }

                if (!lockTableCreated) {
                    PreparedStatement ps1 = conn.prepareStatement(clusterLockTable);
                    try {
                        ps1.executeUpdate();
                    } finally {
                        if (ps1 != null) {
                            ps1.close();
                        }
                    }
                }

                int rowCount = 0;
                PreparedStatement psRowCheck = conn.prepareStatement(LOCK_ROW_CHECK);
                try {
                    ResultSet rowCheckResults = psRowCheck.executeQuery();
                    try {
                        if (rowCheckResults.next()) {
                            rowCount = rowCheckResults.getInt(1);
                        }
                    } finally {
                        if (rowCheckResults != null) {
                            rowCheckResults.close();
                        }
                    }
                } finally {
                    if (psRowCheck != null) {
                        psRowCheck.close();
                    }
                }

                // lock table is not created. let's try to create it. we may fail if another node
                // creates this table concurrently.
                conn.setAutoCommit(false);


                // we can't assume that lock row is created even though the lock table is already
                // created. this is because, lock table can be created from a script.
                if (rowCount == 0) {
                    PreparedStatement ps2 = conn.prepareStatement(initRow);
                    try {
                        ps2.executeUpdate();
                    } finally {
                        if (ps2 != null) {
                            ps2.close();
                        }
                    }
                }

                conn.commit();

            } catch (SQLException e) {

                String msg = "Attempt create the cluster lock table is unsuccessful. " +
                        "Examining the reasons to failure. " +
                        "(Ignore the below error log if this Registry instance is " +
                        "running in a cluster).";
                log.error(msg, e);

                boolean clusterLockTableCreated = false;
                if (conn != null) {

                    try {
                        DatabaseMetaData metaData = conn.getMetaData();
                        ResultSet result = metaData.getTables(null, null,
                                DBUtils.getConvertedAutoGeneratedColumnName(metaData
                                        .getDatabaseProductName(),
                                        "REG_CLUSTER_LOCK"), null);
                        try {
                            if (result.next()) {
                                clusterLockTableCreated = true;
                                if (log.isTraceEnabled()) {
                                    log.trace("Cluster lock table is created by another node in " +
                                            "the cluster. Cluster lock table creation is " +
                                            "successful.");
                                }
                            }
                        } finally {
                            if (result != null) {
                                result.close();
                            }
                        }
                    } catch (SQLException e1) {

                        String msg1 = "Failed to check the existence of the cluster lock table. " +
                                "Caused by: " + e1.getMessage();
                        log.error(msg1, e1);
                    }

                    if (!clusterLockTableCreated) {

                        String msg1 = "Failed to create the cluster lock table. Cluster lock " +
                                "table is not created by any other node. Caused by: " +
                                e.getMessage();
                        log.fatal(msg1, e);

                        try {
                            conn.rollback();
                        } catch (SQLException e1) {
                            String msg2 = "Failed to rollback the database operation after " +
                                    "failing the cluster lock table creation. Caused by: " +
                                    e1.getMessage();
                            log.error(msg2, e1);
                        }

                        throw new RegistryException(msg1, e);
                    }
                } else {
                    String msg1 = "Failed to obtain database connection to create the cluster " +
                            "lock table. Caused by: " + e.getMessage();
                    log.error(msg1, e);
                    throw new RegistryException(msg1, e);
                }

            } finally {
                try {
                    if (conn != null) {
                        conn.close();
                    }
                } catch (SQLException e) {
                    log.error("Failed to close the database connection. Caused by: "
                            + e.getMessage(), e);
                }
            }

            if (log.isTraceEnabled()) {
                log.trace("Cluster wide database locks initialized successfully.");
            }
        }
    }

    public void lock(String lockName) throws RegistryException {

        if (log.isTraceEnabled()) {
            log.trace("Attempting to obtain cluster wide database lock " + lockName + ".");
        }

        boolean lockObtained = obtainLock(lockName);

        while (!lockObtained) {

            if (log.isTraceEnabled()) {
                log.trace("Shared database lock is obtained by some other node. " +
                        "Waiting for lock to be released.");
            }

            try {
                Thread.sleep(lockWaitTime);
            } catch (InterruptedException e) {
                String msg = "Failed to suspend the thread till shared database lock " +
                        "is obtained. Caused by: " + e.getMessage();
                log.error(msg, e);
            }

            lockObtained = obtainLock(lockName);
        }

        if (log.isTraceEnabled()) {
            log.trace("Cluster wide database lock " + lockName + " obtained successfully.");
        }
    }

    public void unlock(String lockName) throws RegistryException {

        if (log.isTraceEnabled()) {
            log.trace("Attempting to release cluster wide database lock " + lockName + ".");
        }

        Connection conn = null;
        try {
            conn = getDataSource().getConnection();

            String lockRow = "UPDATE REG_CLUSTER_LOCK SET REG_LOCK_STATUS='UNLOCKED' " +
                    "WHERE REG_LOCK_NAME=? AND REG_LOCK_STATUS='LOCKED'";
            PreparedStatement ps2 = conn.prepareStatement(lockRow);
            ps2.setString(1, lockName);
            ps2.executeUpdate();
            ps2.close();

        } catch (SQLException e) {

            String msg = "Error occurred while trying to release the shared database lock: " +
                    lockName + ". Caused by: " + e.getMessage();
            log.error(msg, e);
            throw new RegistryException(msg, e);

        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e);
                }
            }
        }

        if (log.isTraceEnabled()) {
            log.trace("Cluster wide database lock " + lockName + " released successfully.");
        }
    }

    /**
     * A utility method used to obtain a lock
     *
     * @param lockName the name of the lock.
     *
     * @return whether the lock was obtained or not.
     * @throws RegistryException if an exception occurred during the process of obtaining a lock.
     */
    private static boolean obtainLock(String lockName) throws RegistryException {

        boolean lockObtained = false;

        Connection conn = null;
        try {
            conn = getDataSource().getConnection();

            String lockRow = "UPDATE REG_CLUSTER_LOCK SET REG_LOCK_STATUS='LOCKED' " +
                    "WHERE REG_LOCK_NAME=? AND REG_LOCK_STATUS='UNLOCKED'";
            PreparedStatement ps2 = conn.prepareStatement(lockRow);
            ps2.setString(1, lockName);
            int updatedRows = ps2.executeUpdate();
            ps2.close();

            if (updatedRows > 0) {
                lockObtained = true;
            }

            return lockObtained;

        } catch (SQLException e) {

            String msg = "Error occurred while trying to obtain a shared database lock: " +
                    lockName + ". Caused by: " + e.getMessage();
            log.error(msg, e);
            throw new RegistryException(msg, e);

        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e);
                }
            }
        }
    }
}